package com.atguigu.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.func.DimAsyncFunction;
import com.atguigu.gmall.realtime.beans.OrderDetail;
import com.atguigu.gmall.realtime.beans.OrderInfo;
import com.atguigu.gmall.realtime.beans.OrderWide;
import com.atguigu.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * Author: Felix
 * Date: 2021/12/29
 * Desc: 订单宽表
 */
public class OrderWideApp {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境的准备
        //1.1 设置流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度\
        env.setParallelism(4);
        /*
        //TODO 2.检查点相关的设置
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
        env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/ck"));
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        */

        //TODO 3.从kafka中读取数据
        //3.1 声明消费的主题以及消费者组
        String orderInfoTopic = "dwd_order_info";
        String orderDetailTopic = "dwd_order_detail";
        String groupId = "order_wide_app_group";
        //3.2 创建消费者对象
        FlinkKafkaConsumer<String> orderInfoSource = MyKafkaUtil.getKafkaSource(orderInfoTopic, groupId);
        FlinkKafkaConsumer<String> orderDetailSource = MyKafkaUtil.getKafkaSource(orderDetailTopic, groupId);
        //3.3 消费数据 封装为流
        DataStreamSource<String> orderInfoStrDS = env.addSource(orderInfoSource);
        DataStreamSource<String> orderDetailStrDS = env.addSource(orderDetailSource);

        //TODO 4.对读取的数据进行类型的转换  jsonStr-->实体类对象
        //4.1 订单
        SingleOutputStreamOperator<OrderInfo> orderInfoDS = orderInfoStrDS.map(
            new RichMapFunction<String, OrderInfo>() {
                private SimpleDateFormat sdf;

                @Override
                public void open(Configuration parameters) throws Exception {
                    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                }

                @Override
                public OrderInfo map(String jsonStr) throws Exception {
                    OrderInfo orderInfo = JSON.parseObject(jsonStr, OrderInfo.class);
                    orderInfo.setCreate_ts(sdf.parse(orderInfo.getCreate_time()).getTime());
                    return orderInfo;
                }
            }
        );
        //4.2 订单明细
        SingleOutputStreamOperator<OrderDetail> orderDetailDS = orderDetailStrDS.map(
            new RichMapFunction<String, OrderDetail>() {
                private SimpleDateFormat sdf;

                @Override
                public void open(Configuration parameters) throws Exception {
                    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                }

                @Override
                public OrderDetail map(String jsonStr) throws Exception {
                    OrderDetail orderDetail = JSON.parseObject(jsonStr, OrderDetail.class);
                    orderDetail.setCreate_ts(sdf.parse(orderDetail.getCreate_time()).getTime());
                    return orderDetail;
                }
            }
        );

        //orderInfoDS.print(">>>");
        //orderDetailDS.print("###");

        //TODO 5. 指定Watermark以及提取事件时间字段
        //5.1 订单
        SingleOutputStreamOperator<OrderInfo> orderInfoWithWatermarkDS = orderInfoDS.assignTimestampsAndWatermarks(
            WatermarkStrategy
                .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                    new SerializableTimestampAssigner<OrderInfo>() {
                        @Override
                        public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                            return orderInfo.getCreate_ts();
                        }
                    }
                )
        );
        //5.2 订单明细
        SingleOutputStreamOperator<OrderDetail> orderDetailWithWatermarkDS = orderDetailDS.assignTimestampsAndWatermarks(
            WatermarkStrategy
                .<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                    new SerializableTimestampAssigner<OrderDetail>() {
                        @Override
                        public long extractTimestamp(OrderDetail orderDetail, long recordTimestamp) {
                            return orderDetail.getCreate_ts();
                        }
                    }
                )
        );

        //TODO 6. 使用keyby进行分组----指定连接条件
        //6.1 订单
        KeyedStream<OrderInfo, Long> orderInfoKeyedDS = orderInfoWithWatermarkDS.keyBy(OrderInfo::getId);
        //6.2 订单明细
        KeyedStream<OrderDetail, Long> orderDetailKeyedDS = orderDetailWithWatermarkDS.keyBy(OrderDetail::getOrder_id);

        //TODO 7.使用intervalJoin 完成双流join
        SingleOutputStreamOperator<OrderWide> joinDS = orderInfoKeyedDS
            .intervalJoin(orderDetailKeyedDS)
            .between(Time.seconds(-5), Time.seconds(5))
            .process(
                new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(orderInfo, orderDetail));
                    }
                }
            );


        //joinDS.print(">>>>>>");

        //TODO 8.和用户维度进行关联
        /*joinDS.map(
            new MapFunction<OrderWide, OrderWide>() {
                @Override
                public OrderWide map(OrderWide orderWide) throws Exception {
                    Long userId = orderWide.getUser_id();
                    //根据用户id到phoenix用户维度表中查询用户数据
                    JSONObject dimInfoJsonObj = DimUtil.getDimInfoNoCache("dim_user_info", Tuple2.of("id", userId.toString()));
                    String gender = dimInfoJsonObj.getString("GENDER");

                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                    String birthdayStr = dimInfoJsonObj.getString("BIRTHDAY");
                    Long birthDayTime = sdf.parse(birthdayStr).getTime();
                    Long currentTimeMillis = System.currentTimeMillis();

                    Long ageLong = (currentTimeMillis - birthDayTime) / 1000 / 60 / 60 / 24 / 365L;

                    orderWide.setUser_age(ageLong.intValue());

                    orderWide.setUser_gender(gender);
                    return orderWide;
                }
            }
        );*/

        SingleOutputStreamOperator<OrderWide> orderWideWithUserInfoDS = AsyncDataStream.unorderedWait(
            joinDS,
            new DimAsyncFunction<OrderWide>("dim_user_info") {
                @Override
                public void join(OrderWide orderWide, JSONObject dimJsonObj) throws Exception {
                    String gender = dimJsonObj.getString("GENDER");

                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                    String birthdayStr = dimJsonObj.getString("BIRTHDAY");
                    Long birthDayTime = sdf.parse(birthdayStr).getTime();
                    Long currentTimeMillis = System.currentTimeMillis();

                    Long ageLong = (currentTimeMillis - birthDayTime) / 1000 / 60 / 60 / 24 / 365L;

                    orderWide.setUser_age(ageLong.intValue());

                    orderWide.setUser_gender(gender);
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return orderWide.getUser_id().toString();
                }
            },
            60,
            TimeUnit.SECONDS
        );

        //orderWideWithUserInfoDS.print(">>>>>>");

        //TODO 9.和地区维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDS = AsyncDataStream.unorderedWait(
            orderWideWithUserInfoDS,
            new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
                @Override
                public void join(OrderWide orderWide, JSONObject provinceDim) throws Exception {
                    orderWide.setProvince_name(provinceDim.getString("NAME"));
                    orderWide.setProvince_area_code(provinceDim.getString("AREA_CODE"));
                    orderWide.setProvince_iso_code(provinceDim.getString("ISO_CODE"));
                    orderWide.setProvince_3166_2_code(provinceDim.getString("ISO_3166_2"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return orderWide.getProvince_id().toString();
                }
            },
            60, TimeUnit.SECONDS
        );

        //orderWideWithProvinceDS.print(">>>>");

        //TODO 10.和sku维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait(
            orderWideWithProvinceDS,
            new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
                @Override
                public void join(OrderWide orderWide, JSONObject dimJsonObj) throws Exception {
                    orderWide.setSku_name(dimJsonObj.getString("SKU_NAME"));
                    orderWide.setCategory3_id(dimJsonObj.getLong("CATEGORY3_ID"));
                    orderWide.setSpu_id(dimJsonObj.getLong("SPU_ID"));
                    orderWide.setTm_id(dimJsonObj.getLong("TM_ID"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return orderWide.getSku_id().toString();
                }
            },
            60, TimeUnit.SECONDS
        );

        //orderWideWithSkuDS.print(">>>>");

        //TODO 11.和SPU维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait(
            orderWideWithSkuDS,
            new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
                @Override
                public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                    orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return String.valueOf(orderWide.getSpu_id());
                }
            }, 60, TimeUnit.SECONDS);

        //TODO 12.和品类维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
            orderWideWithSpuDS,
            new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
                @Override
                public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                    orderWide.setCategory3_name(jsonObject.getString("NAME"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return String.valueOf(orderWide.getCategory3_id());
                }
            }, 60, TimeUnit.SECONDS);

        //TODO 13.和品牌维度进行关联
        SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait(
            orderWideWithCategory3DS, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
                @Override
                public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
                    orderWide.setTm_name(jsonObject.getString("TM_NAME"));
                }

                @Override
                public String getKey(OrderWide orderWide) {
                    return String.valueOf(orderWide.getTm_id());
                }
            }, 60, TimeUnit.SECONDS);

        orderWideWithTmDS.print(">>>>");

        //TODO 14.将订单宽表数据写到kafka的dwm_order_wide主题中
        orderWideWithTmDS
            .map(orderWide->JSON.toJSONString(orderWide))
            .addSink(MyKafkaUtil.getKafkaSink("dwm_order_wide"));

        env.execute();
    }
}
